Spark: Support aggregate pushdown for identity partition column GROUP BY#16176
Spark: Support aggregate pushdown for identity partition column GROUP BY#16176hemanthboyina wants to merge 3 commits intoapache:mainfrom
Conversation
| Map<List<Object>, AggregateEvaluator> evaluatorsByPartition = | ||
| groupFilesByPartition(spec, groupByPositions, boundAggregates); |
There was a problem hiding this comment.
i am not confident this is correct, plus we are just checking the recent partitioning, a table could comprise of lot of different partition spec files which evolved across snapshots
There was a problem hiding this comment.
Thanks for the review @singhpk234 You raised a valid point. the current implementation only considers the current partition spec and bails out for files from different specs. Will look into handling spec evolution properly and update the PR.
There was a problem hiding this comment.
handled partition spec evolution changes, can you please review
anuragmantri
left a comment
There was a problem hiding this comment.
Thanks for the useful PR @hemanthboyina. Overall, it looks good to me. I made some suggestions.
| return -1; | ||
| } | ||
|
|
||
| private boolean allGroupByAreIdentityPartitionFields(Aggregation aggregation) { |
There was a problem hiding this comment.
allGroupByAreIdentityPartitionFields() and resolveGroupByFields() look very similar except
- allGroupByAreIdentityPartitionFields additionally checks instanceof NamedReference
- resolveGroupByFields additionally collects field IDs and fields into output lists
Can we merge these two?
Or maybe let canPushDownAggregation() allow group by and then have the checks in this merged method? What do you think?
| return true; | ||
| } | ||
|
|
||
| private static class ArrayStructLike implements StructLike { |
There was a problem hiding this comment.
Can we use AggregateEvaluator.ArrayStructLike instead? May have to make it package-private.
| @@ -568,11 +568,9 @@ public void testAggregationPushdownOnBucketedColumn() { | |||
| sql( | |||
| "CREATE TABLE %s (id BIGINT, struct_with_int STRUCT<c1:INT>) USING iceberg PARTITIONED BY (bucket(8, id))", | |||
| tableName); | |||
|
|
|||
There was a problem hiding this comment.
Nit: Unrelated whitespace change.
| @@ -909,4 +907,183 @@ public void testAggregatePushDownForIncrementalScan() { | |||
| assertEquals( | |||
| "min/max/count push down", expected2, rowsToJava(unboundedPushdownDs.collectAsList())); | |||
| } | |||
|
|
|||
| @TestTemplate | |||
| public void testGroupByIdentityPartitionColumnCountPushDown() { | |||
There was a problem hiding this comment.
Can we also verify the EXPLAIN string has the pushdown like other tests?
| } | ||
|
|
||
| @TestTemplate | ||
| public void testGroupByIdentityPartitionColumnWithMinMax() { |
There was a problem hiding this comment.
Same here, can we also have explain string verification?
This PR enables aggregate pushdown for queries with GROUP BY on identity partition columns. Currently, Iceberg supports pushing down aggregates (COUNT, MIN, MAX) for queries without GROUP BY, computing results from file metadata instead of reading data files. However, when a query includes GROUP BY, the pushdown is disabled even when the GROUP BY columns are identity partition fields.